XplentyのAPIを使って複数のジョブ実行を試してみた
はじめに
データアナリティクス事業本部のkobayashiです。
XplentyはETL、ELTツールとして様々なデータソースを扱え、また処理もXplentyのGUIで直感的に作成できます。
XplentyにもAPIが用意されておりXplentyのマネージメントコンソールでAPI用のトークンを発行してそれを使うと便利にXplentyを扱えます。
Xplenty | Simplified ETL & ELT to BigQuery, Snowflake, Redshift & Azure
今回はこのAPIをPythonスクリプトから実行しクラスターの作成と複数のジョブを実行してみたいと思います。
APIで取得するデータ
前回作成したe-statのデータを取得するパッケージをXplentyのAPIで使って実行します。Xplentyのパッケージを作成するときに取得する統計データの統計表IDをユーザー変数にしました。これをジョブごとに変更し複数の統計データを取得します。
前回の記事
XplentyのAPIを使って複数ジョブを実行する
XplentyのAPIに関するドキュメントは以下にありますのでまずこれを確認します。 大抵のXplentyの操作はここに記載されているAPIから実行可能です。
今回使うAPIは以下ものを使います。
- クラスターリストを取得 : list-clusters
- クラスターを作成 : create-cluster
- パッケージリストを取得 : list-packages
- ジョブを実行 : run-job
今回は上記のAPIを使って以下の流れで進めます。
- アクセストークンを発行する。
- 実行したいパッケージのパッケージIDを確認する。併せて変数として渡す内容を確認する。
- 「クラスターを作成」、「クラスターが立ち上がるまで待つ」、「ジョブを実行する」Pythonスクリプトを作成し実行する。
アクセストークンの発行
はじめにXplentyのアクセストークンを発行します。Xplentyのマネージメントコンソールより取得します。
手順1). サイドバーのSettings
を押下すると設定画面に進むのでメニューからDeveloper Settings > Access tokens
を選択し設定画面に進む
手順2). Enter your password below to reveal your API key:
の欄があるのでアクセストークンが未発行の場合はログインパスワードを入力しReveal API key
を押下する
手順3).遷移先の画面にアクセストークンが表示される
アクセストークンが発行されたのでこれを使ってAPIを呼び出せます。
APIの呼び出し方法
認証は先に発行したアクセストークンをユーザー名、パスワードは空欄としてBasic認証にて行われます。
対象のAPIによりhttpメソッドは変わりますが、基本的な呼び出し方は以下になります。
curl -X POST -u {アクセストークン}: "https://api.xplenty.com/{アカウントID}/api/{API名}" \ -H "Accept: application/vnd.xplenty+json; version=2" \ -H "Content-Type: application/json" \ -d '{}'
アカウントIDについてはXplentyのマネージメントコンソールのSettings > Your Settings > Accounts
と進んだ先に表示されているAccount ID
になります。
パッケージIDの確認
ジョブを実行する際にはクラスターIDと実行するパッケージIDが必要になります。クラスターIDに関してはスクリプト中で取得しますが、パッケージIDについてはを予めAPIのパッケージリストを取得するAPI(list-packages)を使って確認しておきます。
- パッケージリストを確認するAPI呼び出し
curl -X GET -u {アクセストークン}: https://api.xplenty.com/classmethod-inc/api/packages?status=active -H "Accept: application/vnd.xplenty+json; version=2" \ -H "Content-Type: application/json"
- レスポンス
[ { "id": 128360, "name": "cm-kobayashi_test-kd", "description": null, "flow_type": "dataflow", "flow_version": "2.0.0", "owner_id": 11131, "created_at": "2020-06-13T07:40:46Z", "updated_at": "2020-07-01T21:30:56Z", "status": "active", "version_description": null, "package_version": "1.115", "author": "", "variables": { "appId": "'{e-statのアクセストークン}'", "statsDataId": "'8003000051'" }, "url": "https://api.xplenty.com/{アカウントID}/api/packages/128360", "html_url": "https://dashboard.xplenty.com/{アカウントID}/packages/128360/edit", "version": 116, "item_id": 128360 }, ... ]
レスポンスから対象のパッケージのid
を確認します。これを使ってジョブを実行します。
また併せてジョブ実行時に必要となるvariables
の内容も確認しておきます。Xplentyのパッケージを作成する際にユーザー変数を使うとAPIの呼び出し時にこの変数を上書きすることができます。この上書き機能を使うことで複数のジョブを簡単に実行することができます。
複数ジョブを実行するPythonスクリプトの作成
早速なのですが、以下が今回実行するスクリプトになります。
import pprint import requests import json import time URL = "https://api.xplenty.com/{アカウントID}/api" TOKEN = "{Xplentyのアクセストークン}" class XplentyApi(): def __init__(self): self.session = requests.Session() self.session.auth = (TOKEN, None) # クラスターリストを取得するメソッド def get_clusters(self): resp = self.session.get(URL + "/clusters?status=available", headers={ "Accept": "application/vnd.xplenty+json", "Content-Type": "application/json" }) return resp.json() # クラスターを作成するメソッド def create_cluster(self): param = { "nodes": 1, "type": "sandbox", "terminate_on_idle": True, "time_to_idle": 120 } print(param) resp = self.session.post( URL + "/clusters", json.dumps(param), headers={ "Accept": "application/vnd.xplenty+json; version=2", "Content-Type": "application/json", }) return resp.json() # ジョブを実行するメソッド def create_jobs(self, cluster_id, package_id, variables): param = { "cluster_id": cluster_id, "package_id": package_id, "variables": variables } resp = self.session.post( URL + "/jobs", json.dumps(param), headers={ "Accept": "application/vnd.xplenty+json; version=2", "Content-Type": "application/json", }) return resp.json() # 有効なクラスターを取得するメソッド def get_avalable_cluster_id(self): cluster_id = None clusters = self.get_clusters() if len(clusters) > 0: cluster_id = clusters[0]["id"] else: self.create_cluster() print("クラスターを起動しました。") # availableになるまで待機 while cluster_id is None: clusters = self.get_clusters() if len(clusters) > 0: cluster_id = clusters[0]["id"] else: print("クラスターを起動中") time.sleep(10) return cluster_id def main(): session = XplentyApi() cluster_id = session.get_avalable_cluster_id() # 取得するe-statの統計データの統計表ID # 平成27年国勢調査 小地域 年齢(5歳階級、4区分)別、男女別人口 の47都道府県分のデータ statsDataIds = [ 8003000047 , 8003000048 , 8003000049 , 8003000050 , 8003000051 , 8003000052 , 8003000053 , 8003000054 , 8003000055 , 8003000056 , 8003000057 , 8003000058 , 8003000059 , 8003000060 , 8003000061 , 8003000062 , 8003000063 , 8003000064 , 8003000065 , 8003000066 , 8003000067 , 8003000068 , 8003000069 , 8003000070 , 8003000071 , 8003000072 , 8003000073 , 8003000074 , 8003000075 , 8003000076 , 8003000077 , 8003000078 , 8003000079 , 8003000080 , 8003000081 , 8003000082 , 8003000083 , 8003000084 , 8003000085 , 8003000086 , 8003000087 , 8003000088 , 8003000089 , 8003000090 , 8003000091 , 8003000092 , 8003000093 ] for v in statsDataIds: variables = { "appId": "'{e-statのアクセストークン}'", "statsDataId": "'{}'".format(v) } resp = session.create_jobs(cluster_id, 128360, variables) pprint.pprint(resp) if __name__ == '__main__': main()
ポイントとなる箇所の解説を行います。
セッションをインスタンス変数へ格納
class XplentyApi(): def __init__(self): self.session = requests.Session() self.session.auth = (TOKEN, None)
いくつかhttpリクエストをいつくか送るのでその都度認証して…となると冗長になってしまうのでXplentyApi
クラスのインスタンス化時にセッションを使い回せるようにします。
XplentyのAPIを呼び出すメソッドの作成
# クラスターリストを取得するメソッド def get_clusters(self): resp = self.session.get(URL + "/clusters?status=available", headers={ "Accept": "application/vnd.xplenty+json", "Content-Type": "application/json" }) return resp.json() # クラスターを作成するメソッド def create_cluster(self): param = { "nodes": 1, "type": "sandbox", "terminate_on_idle": True, "time_to_idle": 120 } print(param) resp = self.session.post( URL + "/clusters", json.dumps(param), headers={ "Accept": "application/vnd.xplenty+json; version=2", "Content-Type": "application/json", }) return resp.json() # ジョブを実行するメソッド def create_jobs(self, cluster_id, package_id, variables): param = { "cluster_id": cluster_id, "package_id": package_id, "variables": variables } resp = self.session.post( URL + "/jobs", json.dumps(param), headers={ "Accept": "application/vnd.xplenty+json; version=2", "Content-Type": "application/json", }) return resp.json()
以下のそれぞれのXplentyのAPIを呼び出すメソッドを作成しています。リクエストの内容は詳しくは下記のドキュメントを参考にしてください。
- クラスターリストを取得 : list-clusters
- クラスターを作成 : create-cluster
- ジョブを実行 : run-job
今回は以下の条件になりますが、他にも設定はできますので公式ドキュメントをご確認ください。
- クラスターリストを取得
- ステータスが利用可能なクラスター
- クラスターを作成
- サンドボックス用のクラスター
- 無可動時間が2分でクラスターを削除
- ジョブを実行
- クラスターID、ジョブID、ユーザー変数を指定
有効なクラスターを取得するメソッド
# 有効なクラスターを取得するメソッド def get_avalable_cluster_id(self): cluster_id = None clusters = self.get_clusters() if len(clusters) > 0: cluster_id = clusters[0]["id"] else: self.create_cluster() print("クラスターを起動しました。") # availableになるまで待機 while cluster_id is None: clusters = self.get_clusters() if len(clusters) > 0: cluster_id = clusters[0]["id"] else: print("クラスターを起動中") time.sleep(10) return cluster_id
ジョブを実行できるクラスターがあればそのクラスターIDを返し、なければクラスターを作成して利用できるまで待機した後にクラスターIDを返すメソッドになります。
実際にジョブを実行するブロック
def main(): session = XplentyApi() cluster_id = session.get_avalable_cluster_id() # 取得するe-statの統計データの統計表ID # 平成27年国勢調査 小地域 年齢(5歳階級、4区分)別、男女別人口 の47都道府県分のデータ statsDataIds = [ 8003000047 ... , 8003000093 ] for v in statsDataIds: variables = { "appId": "'{e-statのアクセストークン}'", "statsDataId": "'{}'".format(v) } resp = session.create_jobs(cluster_id, 128360, variables) pprint.pprint(resp)
XplentyApi
をインスタンス化し有効なクラスターIDを取得します。取得したクラスターIDを用いて統計表IDのリストから順次統計IDを取得し、それをXplentyのAPIを呼び出す際にユーザー変数として与えて目的の統計データを取得するジョブ(パッケージID:128360
)を実行しています。
Pythonスクリプトの実行と確認
上記のスクリプトを実行すると以下の様な形でジョブが実行されます。
$ python xplenty-api.py {'nodes': 1, 'type': 'sandbox', 'terminate_on_idle': True, 'time_to_idle': 120} クラスターを起動しました。 クラスターを起動中 クラスターを起動中 クラスターを起動中 クラスターを起動中 クラスターを起動中 {'cluster_id': 1272794, 'completed_at': None, 'component': {'name': '', 'type': ''}, 'created_at': '2020-07-02T07:16:17Z', 'errors': None, 'failed_at': None, 'id': 35966114, 'outputs': [], 'outputs_count': 0, 'owner_id': 11131, 'package_id': 128360, 'progress': 0.0, 'runtime_in_seconds': 0, 'started_at': None, 'status': 'pending', 'updated_at': '2020-07-02T07:16:17Z', 'variables': { 'appId': "'{e-statのアクセストークン}'", 'statsDataId': "'8003000047'" } }
ジョブの状況をXplentyのマネージメントコンソールから確認すると複数のジョブが実行されていることが確認できます。
まとめ
XplentyのAPIを使ってクラスターの作成とジョブの実行を行ってみました。同じ処理だけど一部の条件が違うジョブを複数実行したい場合は、
- パッケージ作成時にユーザー変数として登録する
- XplentyのAPIを呼び出す先にユーザー変数を逐次変更してジョブを実行する
で可能でした。
最後まで読んで頂いてありがとうございました。